本文作者
作者:Amter
链接:
https://juejin.cn/post/6898862722357100557
本文由作者授权发布。
RxJava的版本从发布到现在,已经经历了好多个版本了,虽然源码在不断的修改,但是不知你有没有发现,RxJava的主体架构还是没有变化,为什么呢?可以说是RxJava架构决定了它的特性,比如代码逻辑的简洁以及操作符带来极强的扩展能力,这些在RxJava迭代了这么多个版本之后,这些特性,没有减少,反而大大的增强了,这个特性,就是响应式编程,那么接下来,就来讲讲RxJava为什么会有这种特性,以及带来其特性不变的本质是啥!
本文主要讲解RxJava的架构思想,不会涉及到大量的源码分析,请放心食用,文章篇幅较长,建议收藏,慢慢品尝!
implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
不知你是否会有这样的焦虑,框架的更新有时候会让人感叹!
虽然RxJava的版本更新没有那么频繁,但是每次的更新,总会让人感觉,之前刚看的源码,是不是前功尽弃了😭,源码一更新,我又得继续去学习;
但是不知你是否有这样想过,框架一直在更新,底层有没有不变的东西,今天很高兴告诉你,RxJava框架有,那么这个不变的东西是什么呢?
架构
没错,RxJava虽然迭代了几个版本,但是其底层的架构还是没有怎么变动,为什么呢?因为其特性就决定了它的架构不会有多大的变化;
它的特性有哪些?简单明了的代码逻辑,强大的操作符,而这些特性正是响应式编程的思想;
就好比一栋房子,其特性有抗震,防风,那么其底层的架构就是必然是按着抗震和防风的特性去建造的,而一旦建造成功,其具有的特性,不会跟着房子的装修而变化,那么其底层的架构也是同样的道理;
那么你想知道是什么架构来实现这种特性的吗?
别急,下面我们先来讲一讲RxJava涉及到的设计模式,为什么设计模式这么重要呢?
因为设计模式是架构的基础,我们怎么设计才能让这个架构具有某种特性,这个和设计模式分不开;
2.1、观察者模式
观察者模式,或许是我们最熟悉的设计模式,为什么呢?因为我们在代码里无时无刻都在使用着它;
它就是View的点击事件;
为什么说View的点击事件是观察者模式呢?下面我们先来看看观察者模式的定义;
定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知。
简单来说,就是被观察者与观察者之间存在着一对多的关系,一个被观察者可以被多个观察者依赖,当被观察者变化时,会通知到观察者;
而View的点击事件这里只是简单的一对一的关系,但是其实是可以实现一对多的关系;
而观察者模式的本质是当被观察者变化时,会通知到观察者,当View被点击的时候,会通过回调的监听通知到观察者,而观察者和被观察者之间存在订阅关系的时候,观察者才会被通知到;
而View的点击事件,是通过设置监听方法来实现订阅的,这就是我们最熟悉的观察者模式;
2.2、RxJava的观察者模式是怎样的呢?
RxJava的观察者模式,是扩展的观察者模式,为啥要叫扩展呢?因为它和普通的观察者模式不太一样,扩展的观察者模式,不止一个通知观察者的方法,它有好几个,下面我们来看看它的实现原理吧!
首先先来看一下RxJava观察者模式涉及到的几个类:
Observable:被观察者
Observer:观察者
Event:被观察者通知观察者的事件
Subscribe:订阅
看完下面这个图,你的心目中是不是已经对RxJava的观察者模式一目了然了;
下面我们来看一RxJava的事件类型,这个事件是被观察者用来通知观察者的,也就是Event,而Event可以分为下面几种类型,这个我们了解一下即可;
Next:常规事件,可以传递各种各样的数据;
Complete:结束事件,当观察者接收到结束事件后,就不会再接收后续被观察者发送来的事件;
Error:异常事件,当被观察者发送异常事件后,那么其他的事件就不会再继续发送了;
下面我用示例代码来讲一次RxJava的观察者模式;
首先定义一个观察者Observer:
public abstract class Observer<T> {
// 和被观察者订阅后,会回调这个方法;
public abstract void onSubscribe(Emitter emitter);
// 传递常规事件,用于传递数据
public abstract void onNext(T t);
// 传递异常事件
public abstract void onError(Throwable e);
// 传递结束事件
public abstract void onComplete();
}
Observer类的方法很简单,都是回调,这里有个新的接口类Emitter,这个Emitter是用来干嘛的呢?
我们暂且把这个Emitter称为发射器,主要用于发射事件;
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}
实现逻辑就是通过包装Observer,里面最终是通过Observer来进行调用的,来看看这个类有哪些方法;
public class CreateEmitter<T> implements Emitter<T> {
final Observer<T> observer;
CreateEmitter(Observer<T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
@Override
public void onError(Throwable error) {
observer.onError(error);
}
@Override
public void onComplete() {
observer.onComplete();
}
}
里面的具体实现就是通过Observer对象来进行调用;
下面我们来看一下被观察者Observable是怎么实现的;
public abstract class Observable<T> {
// 实现订阅的逻辑
public void subscribe(Observer<T> observer){
// 通过将传进来的observer包装成CreateEmitter,用于回调
CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
// 回调订阅成功的方法
observer.onSubscribe(emitter);
// 回调发射器emitter
subscribe(emitter);
}
// 订阅成功后,进行回调
public abstract void subscribe(Emitter<T> emitter);
}
这个类的逻辑很简单,就两步,第一步,进行订阅,第二步,回调Emitter对象,用于发射事件;
那么我们来看看怎么调用吧;
private void observer() {
// 第一步,创建被观察者
Observable<String> observable = new Observable<String>() {
@Override
public void subscribe(Emitter<String> emitter) {
emitter.onNext("第一次");
emitter.onNext("第二次");
emitter.onNext("第三次");
emitter.onComplete();
}
};
// 第二步,创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Emitter emitter) {
Log.i("TAG", " onSubscribe ");
}
@Override
public void onNext(String s) {
Log.i("TAG", " onNext s:" + s);
}
@Override
public void onError(Throwable e) {
Log.i("TAG", " onError e:" + e.toString());
}
@Override
public void onComplete() {
Log.i("TAG", " onComplete ");
}
};
// 第三步,被观察者订阅观察者
observable.subscribe(observer);
}
这里是使用逻辑很简单,分为三步:
第一步:创建被观察者Observable;
第二步:创建观察者Observer;
第三步:被观察者Observable订阅Observer;
当订阅成功之后,被观察者的subscribe方法里面,就可以通过发射器发射各种事件,最终在观察者的方法里进行回调;
RxJava也是观察者和被观察者订阅的过程,只是被观察者有变化的时候,是通过发射器来发射各种事件的,这样就不局限于一种事件了;
3.1、装饰者模式
什么是装饰者模式?
要理解这个模式其实不难,我们从“装饰”这两个字就可以看出,这个模式用于装饰用的,至于怎么装饰,且听我细细道来;
比如说我现在有一个手机,我怎么在不改变这个手机原有的结构,而让其具有防摔的功能,当然你也可以说我的手机是诺基亚,从几楼往下丢,手机都不带磕碰的,但是现实是,我们使用的手机,并不是那么的抗摔;
那么我要怎么让其具有更强的抗摔能力呢?
相信答案你已经很清楚了,就是套手机壳,贴膜,而这两个动作,是在没有改变手机原有的结构上,让其具有了抗摔的功能,而这个过程可以称为装饰,而装饰者模式的原理也是如此;
在不改变其原有结构的基础上,为其添加额外的功能,是作为其原有结构的包装,这个过程称为装饰;
那么在代码里是怎么体现出来的呢?
同理,假如我们要在一个类上添加新功能,而不修改其原有的逻辑,那么我们这时候就可以使用装饰者模式进行封装,具体怎么做,我们下面来看看;
还是以上面为例子,定义一个外观的接口Appearance,有一个抽象的方法,结构structure;
public interface Appearance {
void structure();
}
然后再定义一个手机类Phone实现这个接口,这个手机的结构有玻璃后盖,金属边框等属性,如下:
public class Phone implements Appearance {
@Override
public void structure() {
// 手机属性:玻璃后盖,金属边框
Log.i("TAG", "手机的属性:玻璃后盖,金属边框");
}
}
好了,接下来我们要让这个手机变得更坚固,但是又不能改变手机原有的结构,那么我们要怎么做呢?
如果不能修改其原有的结构,那么我可以通过装饰来对手机进行包装,先定义一个手机的包装类,用来包装手机,命名为PhoneDecorator,实现了Appearance接口,在这里通过构造方法传进来的外观类Appearance,调用了外观类的structure方法,保证其原有的功能实现;
简单来说,这个类的作用,就是为了实现原有类的功能;
public abstract class PhoneDecorator implements Appearance {
protected Appearance appearance;
public PhoneDecorator(Appearance appearance) {
this.appearance = appearance;
}
@Override
public void structure() {
appearance.structure();
}
}
那么接下来就是包装类的具体实现了,定义一个套手机壳功能的类PhoneShell,功能实现就是在原有功能的基础上,给手机套上手机壳,来看看具体实现吧;
public class PhoneShell extends PhoneDecorator{
public PhoneShell(Appearance appearance) {
super(appearance);
}
@Override
public void structure() {
super.structure();
Log.i("TAG", "给手机套上手机壳");
}
}
这里的实现很简单,继承手机的包装类,在structure里面去实现“套上手机壳”的操作;
那么套手机壳的类有了,还差一个贴膜的类,和手机壳一样,我们也来定义一个贴膜的包装类PhoneCover,看看具体实现;
public class PhoneCover extends PhoneDecorator{
public PhoneCover(Appearance appearance) {
super(appearance);
}
@Override
public void structure() {
super.structure();
Log.i("TAG", "给手机贴上钢化膜");
}
}
这里的实现和上面的套手机壳的操作一样,那么到这里两个包装类都写好了,我们来看看怎么调用吧;
private void decorator() {
// 创建一个手机
Phone phone = new Phone();
// 给手机套上手机壳
PhoneShell phoneShell = new PhoneShell(phone);
// 给手机贴上钢化膜
PhoneCover phoneCover = new PhoneCover(phoneShell);
// 最终的手机结构
phoneCover.structure();
}
使用起来很简单,将需要包装的类,作为构造参数,传入到包装类里面,就可以让这个类具有包装的功能,比如这里,将手机Phone传入到手机壳PhoneShell的类里面,那么手机就有套上手机壳的功能了;
同理,再将套上手机壳的手机PhoneShell类,传入到贴膜的类PhoneCover里面,那么这个手机就具有了贴膜的功能,最后再调用一下结构的方法structure,那么就可以看到这个手机已经被套上手机壳,并且贴上膜了;
最终包装后的结构如下:
到这里你有没有发现,装饰者对于功能的扩展并不是使用的继承的方法,为什么?
因为继承随着功能的增加,会导致子类越来越膨胀,而装饰者模式的双方可以随意扩展,不会相互耦合;
那么RxJava的装饰者模式是怎么实现的呢?
且听我细细道来;
3.2、RxJava的装饰者模式是怎么实现的呢?
RxJava的装饰者主要是用于实现被观察者Observable和观察者Observer的包装,为什么要进行包装呢?
从上面我们可以知道,装饰者模式,是基础功能上,不修改其原有的逻辑,进行扩展;
那么为什么RxJava的被观察者需要这种特性呢?
假如我想实现这样一种功能,在子线程获取数据,然后切换到主线程进行数据的赋值,正常情况下我们会这样做,先在子线程获取数据,然后再通过Handler的post方法,切到主线程;
但是如果我想在子线程获取到数据后,然后再对数据做一下转化处理,最后再回调给主线程呢?
如果按照常规的实现逻辑,这样的代码就会很混乱,作为一名有追求的工程师,我们是无法忍受这样的写法的;
那么有没有什么方式可以变的优雅一些呢?
答案是:有的;
RxJava通过装饰者模式+观察者模式设计出了链式调用的效果,这样代码逻辑清晰,也方便维护;
比如下面这样的链式调用逻辑:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) {
// 发射器发射数据
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
这样的链式调用逻辑是不是很清晰!
下面我们来看看RxJava的装饰者模式具体是怎么实现的;
我们以上面讲的手机模型为例子,来一步步拆解RxJava的装饰器模式;
(1) 被观察者Observable:
第一步:要有一个抽象接口对应上面的Appearance接口,而RxJava的接口是ObservableSource,里面有一个方法,subscribe,用于和观察者进行订阅;
public interface ObservableSource<T> {
/**
* Subscribes the given Observer to this ObservableSource instance.
* @param observer the Observer, not null
* @throws NullPointerException if {@code observer} is null
*/
void subscribe(@NonNull Observer<? super T> observer);
}
第二步:要有一个包装类,实现了ObservableSource接口,对应上面的PhoneDecorator包装类,RxJava的包装类是Observable,和PhoneDecorator同理,实现了对应的接口,并且在subscribe方法里面通过调用抽象方法subscribeActual,来对观察者进行订阅;
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
第三步:接下来就是具体的包装类了,和上面一样具有包装功能的PhoneShell,PhoneCover等等,而RxJava的包装类,非常强大,先看个图;
有一百多个包装类,由此可以看出RxJava的强大,当然也是装饰者模式给与其易扩展的特性;
上面看完被观察者的逻辑,下面来看看观察者的装饰者逻辑;
(2)观察者Observer:
第一步:要有一个抽象接口对应上面的Appearance接口,而RxJava的接口是Emitter和Observer,里面有好几个方法,基本一样,onNext,onError,onComplete,用于被观察者进行回调;
第二步:要有一个包装类,实现了Emitter或者Observer接口,观察者比较特殊,没有一个基础的包装类,而是直接封装了很多的包装类;
也是有100多个包装类;
那么到这里你是否会有疑惑,被观察者和观察者这么多的包装类,到底要咋用?
从上面的例子,可以知道,包装类是有一个过程的,有的是在创建的时候就进行包装了,而有的是在调用的时候进行包装的;
而RxJava的被观察者是在创建的时候进行包装的,比如上面的示例代码,第一步,通过Observable.create方法,里面通过创建ObservableCreate对象,进行了第一层的包装,此时的结构如下:
第二步的subscribeOn方法调用时,进行了第二层的包装,此时的结构如下:
第三步的observeOn方法调用时,进行了第四层的包装,那么结构就是下面这样样子:
最终调用订阅的方法的时候,已经进行了四次包装,可以这么理解,每调动一次操作符,那么就会进行一层被观察者的包装;
这样包装的好处是什么呢?
前面我们讲过,装饰者模式,是为了在不改变其原有的基础上,添加额外的功能;
这进行了这么几次包装的作用,就是为了添加额外的功能,那么来大概看一下每一层添加的额外功能有啥?
3.3、被观察者的subscribe方法
当我们最终调用了subscribe方法之后,我们会从最外层的包装类,一步一步的往里面调用;
上面我们知道,被观察者的包装,是在subscribeActual方法里,进行实现的,那么我们来看看这几个包装类的subscribeActual方法的逻辑;
先来看最外层的包装,来看一下subscribeActual大概的逻辑:
这里的source是上一层包装类的实例,也就是ObservableSubscribeOn;
这里会将观察者进行一层包装,也就是ObserveOnObserver,这个ObserveOnObserver的包装,里面实现了线程切换的逻辑,具体逻辑在onNext里面;
为什么要这么做呢?因为这就是装饰者模式带来的好处,这个onNext的被观察者通知观察者会回调的方法,然后这里通过包装类,在里面实现了额外的线程切换的功能,这里会切换到主线程去执行;
此时,观察者的结构是这样的:
下面我们来看倒数第二层的包装类的subscribeActual方法的逻辑,倒数第二个包装类是ObservableSubscribeOn;
这一层包装类的subscribeActual方法又对观察者做了一层包装,也就是SubscribeOnObserver类,这个包装类又实现了什么功能呢?
这里做了一些线程的释放,这个我们下面再讲;
包装完之后观察者的结构是这样的:
让我们回到被观察者的实现逻辑,下面就调用了执行线程的方法,scheduleDirect,如果你传进来的是子线成的线程调度器Scheduler,那么SubscribeTask就会在子线程执行,而我们这里传的就是子线程;
在SubscribeTask,又调用了subscribe方法,这个source是上一层的包装类,也就是ObservableCreate,那么ObservableCreate的subscribeActual方法,就会在子线程执行了;
下面我们来看看ObservableCreate这个包装类的subscribeActual方法;
这里对观察者做了一层包装,也是CreateEmitter类,来看这个观察者的包装类又实现了什么额外的功能呢?
这里面主要实现了判断线程是否释放了,如果释放了,那么观察者就不再进行回调;
那么这时候,观察者的结构是这样的:
接下来就调用了观察者的onSubscribe方法,最终会回调到观察者Observer的onSubscribe方法;
然后下面就调用了source.subscribe(parent),这个source是我们创建的最原始的ObservableOnSubscribe,这里会回调到ObservableOnSubscribe的subscribe方法;
此时,我们上面的包装类ObservableSubscribeOn,切换到子线程后,那么我们的ObservableOnSubscribe的subscribe方法的执行也是在子线程;
3.4、被观察者通知观察者的事件是怎么流向的呢?
然后我们在ObservableOnSubscribe的subscribe方法里调用了发射器,发射字符串,那么这时候的调用逻辑是怎样的呢?
从上面观察者的结构来看,当发射器发送事件时,会一层层的回调对应的观察者包装类,从最外面一层开始;
上面我们知道ObservableEmitter类是CreateEmitter对观察者的包装,那么这个onNext就会走CreateEmitter的onNext方法,上面我们知道这个方法只是做了判断,最终还是回调给上一层的包装类的onNext方法;
再上一层的包装类是SubscribeOnObserver,这个方法的onNext没有对观察者做任何处理;
那么还是得继续往上一层的包装类进行查看,上一层的包装类是ObserveOnObserver,这个类的onNext方法,执行了线程的切换,最终切换到主线程执行;
那么最终回调到这里之后,就是在主线程执行了;
我们在创建被被观察者的时候,会对被观察者做一层包装, 创建几次就包装几次,然后再被观察者调用subscribe方法时,一层层回调被观察者的subscribeActual方法,而在被观察者的subscribeActual方法里,会对观察者做一层包装;
也就是说被观察者是在创建的时候进行包装,然后在subscribeActual方法里实现额外的功能;
而观察者是在被观察者调用subscribeActual方法里进行包装的,然后针对观察者实现自己额外的功能;
下面我们来看一下流程图:
那么到这里,RxJava底层架构是不是已经清晰明了了,总结起来就是观察者模式+装饰者模式;
通过装饰者模式来包装观察者和被观察者,然后在包装类里面实现额外的功能;
那么最终的架构如下:
第一步:创建被观察者时,或者使用操作符时,会对被观察者进行包装:
第二步:被观察者订阅观察者,这时候会一层层的回调被观察者的包装类的subscribeActual方法,然后对观察者进行包装;
此时,被观察者的功能实现是在subscribeActual方法里,而观察者的实现是在包装类里;
第三步:被观察者和观察者不同的是,被观察者是在订阅成功之后,就执行了包装类相应的功能,而观察者是在事件回调的时候,会在观察者的包装类里实现对应的功能;
最终流程图:
5.1、事务的概念
在开始之前,我们先来了解一个概念,“事务”;
什么是事务?
事务,一般是指要做的或所做的事情。而在代码里我们可以理解为一段代码逻辑;
而事务的关系,我们可以理解为业务逻辑之间的关系,有可能有关联,也有可能没有关联;
比如我进入一个列表页,这个列表页的数据,需要从好几个接口请求返回的,而请求返回后我还要根据数据和网络来展示对应的页面,比如列表页或者无数据,无网络的页面,那么我展示的逻辑就是根据列表页返回的逻辑来展示的;
而这几个请求,我姑且称为事务A,事务B,事务C,事务D;
事务A,事务B,事务C分别对应请求网络接口的数据,而事务D则是根据返回的数据处理展示的逻辑;
那么我正常的处理逻辑可能是这样,在子线程去处理这三个事务A,事务B,事务C,最终等都处理完了之后,再处理事务D,而这样写的坏处就是我把这几个接口的数据都放在一个子线程去执行了,那么最终结果就是会导致加载缓慢;
那么我们是否可以换成另外一种写法,事务A,事务B,事务C分别在三个子线程去执行,然后最终在三个子线程的回调里面去判断这几个接口是否已经加载完毕了,这样可以解决上面的问题,但是如果以后还有新增的事务,那么最终会导致判断的逻辑越来越臃肿;
而RxJava提供了响应式编程的思想,可以解决这类问题;
5.2、响应式编程
什么是响应式编程?
响应式编程是一种通过异步和数据流来构建事务关系的编程模型
我们可以理解为由事件来驱动事务,比如我请求网络数据成功了,发送请求成功的事件通知下一层事务进行处理;
而RxJava提供了一系列的特性,比如我们可以对事务进行变换,串联,组装等来对事务进行操作,比如上面的事务A,事务B,事务C,我们可以通过组装的方式来进行处理;
那么最终RxJava的处理逻辑如下:
将事务A,事务B,事务C进行组装,等处理完毕了之后,最终会发送一个事件通知事务D进行处理;
RxJava响应式编程带来了什么好处?
1、大幅度降低事务之间的耦合性,方便后期维护与扩展;
2、简化复杂的线程操作,让我们专注于业务开发,避免了很多线程并发的问题,比如线程死锁;
3、提供了丰富的操作符,让我们对于事务的处理更加方便;
4、对于复杂的业务,我们可以构建出清晰的代码逻辑,方便理解业务逻辑;
5.3、RxJava的架构对于响应式编程的思考
RxJava底层通过观察者模式来处理的事件传递,通过装饰者模式来处理事务的操作,由这两个设计者模式来构建了响应式编程的思想,并且装饰者模式还保证了其灵活的扩展性,比如我以后要新增一个操作符,只需要实现对应的观察者和被观察者的包装类即可;
RxJava不仅仅是一个异步框架,还提供了我们处理事务的能力,把复杂的逻辑通过响应式编程的思想,变得更清晰易懂,这让我们对于复杂业务的处理更加的得心应手;
这是非常优秀的源码,也很感叹作者奇妙的思路,很值得我们去学习;
当我们掌握了RxJava的核心原理后,那么无论源码再怎么更新,也脱离不了这个架构思想,当我们带着这个架构思想去看源码细节的时候,架构思想就是你的灯塔,让你不会迷失在茫茫的码海里😊;
关于我
兄dei,如果我的文章对你有帮助的话,请帮我点个赞吧️,也可以关注一下我的Github和博客;